Feed Adapter

Spring Integration provides support for syndication through feed adapters. The implementation is based on the ROME Framework.

You need to include this dependency into your project:

Maven
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-feed</artifactId>
    <version>5.5.12</version>
</dependency>
Gradle
compile "org.springframework.integration:spring-integration-feed:5.5.12"

Web syndication is a way to publish material such as news stories, press releases, blog posts, and other items typically available on a website but also made available in a feed format such as RSS or ATOM.

Spring integration provides support for web syndication through its 'feed' adapter and provides convenient namespace-based configuration for it. To configure the 'feed' namespace, include the following elements within the headers of your XML configuration file:

xmlns:int-feed="http://www.springframework.org/schema/integration/feed"
xsi:schemaLocation="http://www.springframework.org/schema/integration/feed
	https://www.springframework.org/schema/integration/feed/spring-integration-feed.xsd"

Feed Inbound Channel Adapter

The only adapter you really need to provide support for retrieving feeds is an inbound channel adapter. It lets you subscribe to a particular URL. The following example shows a possible configuration:

<int-feed:inbound-channel-adapter id="feedAdapter"
        channel="feedChannel"
        url="https://feeds.bbci.co.uk/news/rss.xml">
    <int:poller fixed-rate="10000" max-messages-per-poll="100" />
</int-feed:inbound-channel-adapter>

In the preceding configuration, we are subscribing to a URL identified by the url attribute.

As news items are retrieved, they are converted to messages and sent to a channel identified by the channel attribute. The payload of each message is a com.sun.syndication.feed.synd.SyndEntry instance. Each one encapsulates various data about a news item (content, dates, authors, and other details).

The inbound feed channel adapter is a polling consumer. That means that you must provide a poller configuration. However, one important thing you must understand with regard to a feed is that its inner workings are slightly different then most other polling consumers. When an inbound feed adapter is started, it does the first poll and receives a com.sun.syndication.feed.synd.SyndEntryFeed instance. That object contains multiple SyndEntry objects. Each entry is stored in the local entry queue and is released based on the value in the max-messages-per-poll attribute, such that each message contains a single entry. If, during retrieval of the entries from the entry queue, the queue has become empty, the adapter attempts to update the feed, thereby populating the queue with more entries (SyndEntry instances), if any are available. Otherwise the next attempt to poll for a feed is determined by the trigger of the poller (every ten seconds in the preceding configuration).

Duplicate Entries

Polling for a feed can result in entries that have already been processed (“I already read that news item, why are you showing it to me again?”). Spring Integration provides a convenient mechanism to eliminate the need to worry about duplicate entries. Each feed entry has a “published date” field. Every time a new Message is generated and sent, Spring Integration stores the value of the latest published date in an instance of the MetadataStore strategy (see Metadata Store).

The key used to persist the latest published date is the value of the (required) id attribute of the feed inbound channel adapter component plus the feedUrl (if any) from the adapter’s configuration.

Other Options

Starting with version 5.0, the deprecated com.rometools.fetcher.FeedFetcher option has been removed and an overloaded FeedEntryMessageSource constructor for an org.springframework.core.io.Resource is provided. This is useful when the feed source is not an HTTP endpoint but is any other resource (such as local or remote on FTP). In the FeedEntryMessageSource logic, such a resource (or provided URL) is parsed by the SyndFeedInput to the SyndFeed object for the processing mentioned earlier. You can also inject a customized SyndFeedInput (for example, with the allowDoctypes option) instance into the FeedEntryMessageSource.

If the connection to the feed needs some customization, e.g. connection and read timeouts, the org.springframework.core.io.UrlResource extension with its customizeConnection(HttpURLConnection) override has to be used instead of plain URL injection into the FeedEntryMessageSource. For example:

@Bean
@InboundChannelAdapter("feedChannel")
FeedEntryMessageSource feedEntrySource() {
    UrlResource urlResource =
	    new UrlResource(url) {

	        @Override
	        protected void customizeConnection(HttpURLConnection connection) throws IOException {
	            super.customizeConnection(connection);
	            connection.setConnectTimeout(10000);
	            connection.setReadTimeout(5000);
	        }
	    };
    return new FeedEntryMessageSource(urlResource, "myKey");
}

Java DSL Configuration

The following Spring Boot application shows an example of how to configure the inbound adapter with the Java DSL:

@SpringBootApplication
public class FeedJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(FeedJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Value("org/springframework/integration/feed/sample.rss")
    private Resource feedResource;

    @Bean
    public MetadataStore metadataStore() {
        PropertiesPersistingMetadataStore metadataStore = new PropertiesPersistingMetadataStore();
        metadataStore.setBaseDirectory(tempFolder.getRoot().getAbsolutePath());
        return metadataStore;
    }

    @Bean
    public IntegrationFlow feedFlow() {
        return IntegrationFlows
                .from(Feed.inboundAdapter(this.feedResource, "feedTest")
                                .metadataStore(metadataStore()),
                        e -> e.poller(p -> p.fixedDelay(100)))
                .channel(c -> c.queue("entries"))
                .get();
    }

}